[HBase]03 HBASE进阶 三

Hbase数据存储过程优化、Hbase服务端 常用 读写 优化策略、HBase协处理器

Posted by 李玉坤 on 2017-09-23

什么导致hbase性能下降?

  • jvm内存分配与gc回收策略
  • 与hbase运行机制相关的部分配置不合理
  • 表结构设计及用户使用方式不合理

    Hbase概念

    Hbase数据存储过程·

  • Hbase写入时当memstore达到一定的大小会flush到磁盘保存成 HFile, 当hfile小文件太多会执行compact操作进行合并。(当一个hstore里只包含一个hfile时;查询效率才是最大化。因为hfile小文件过多会影响查询时长,合并后才可以提高查询效率。)
  • 当region的大小达到某一阈值之后, 会执行split操作

compaction分两种:

  • Minor compaction: 选取一些小的 、 相邻的storefile将他们合并成一个更大的storefile
  • Major compaction: 将所有的storefile合并成一个storefile, 清理无意义数据、被删除的数据、ttl过期数据 、 版本号超过设定版本号的数据;资源消耗最大。
  • Split: 当一个region达到一定的大小就会自动split成两个region

compact促发条件:

  • Memstore被flush到磁盘,
  • 用户执行shell命令compact, Major_compact 或者调用了相应的api
  • Hbase后台线程周期性触发检查

Hbase优化策略

hbase优化

  • 常见服务端配置优化
  • 常用优化策略 (以实际需求为主)
  • Hbase读/写性能优化

HBase优化策略一:服务端优化策略

  • Jvm设置与gc设置
  • hbase-site. xml 部分属性配置
    max.filesize默认10G;
    majorcompaction默认1天建议0,通过手动合并,因为需要用到很长时间;
    min默认3;


cache.size在偏向读的业务当中可以适当调大一些。
flush.size可以设置大一点;
block.multiplier建议设置成5;如果太大会有内存溢出的危险。

(hbase.hstore.blockingStoreFiles:默认为7,如果任何一个store(非.META.表里的store)的storefile的文件数大于该值,则在flush memstore前先进行split或者compact,同时把该region添加到flushQueue,延时刷新,这期间会阻塞写操作直到compact完成或者超过hbase.hstore.blockingWaitTime(默认90s)配置的时间,可以设置为30,避免memstore不及时flush。当regionserver运行日志中出现大量的“Region <regionName> has too many store files; delaying flush up to 90000ms”时,说明这个值需要调整了)

HBase优化策略二:常用优化策略

HBase常优用化
-预先分区 -RowKey优化 -Column优化 -Schema优化

  • 预先分区
    • 创建hbase表的时候会自动创建一个region分区(hbase默认建一个regionserver上建region;后期数据大会分为两个region)
    • 创建hbase表的时候预先创建一些空的regions(减少io操作;通过预先分区;解决数据倾斜问题;将频繁访问的数据放到多个region中;将不常访问的分区放到一个或几个region中)
  • Rowkey优化
    • 利用hbase默认排序特点, 将一起访问的数据放到一起
    • 防止热点问题(大量的client集中访问一个节点), 避免使用时序或者单调的递增递减等
  • Column优化
    • 列族的名称和列的描述命名尽量简短(过长会占据内存空间)
    • 同一张表中columnfamily的数量不要超过 3 个
  • Schema优化
    • 宽表: 一种 “列多行少” 的设计(事物性能好;hbase的事物建立在行的基础上的;行少插入的时候可以有很好的保障)
    • 高表: 一种 “列少行多” 的设计(查询性能好;查询的条件放入rowkey中我们可以缓存更多的行;元数据来讲开销大因为行多rowkey多)
      **hbase主要在于不必苛刻设计于哪种;主要取决于业务。**

HBase优化策略三:读写优化策略

  • Hbase写优化策略
    • 同步批量提交or异步批量提交
    • WAL优化, 是否必须, 持久化等级

默认时同步提交数据的;异步提交是可能丢失一些数据的;在业允许的情况下可以开启。

WAL默认是开启的;一方面确保数据缓存丢失了也可以数据恢复;另一方面是为了集群间的异步复制;更关注写入的吞吐量的时候可以关闭WAL或者采用异步写入。

  • Hbase读优化策略
    • 客户端: Scan缓存设置, 批量获取
    • 服务端: blockcache配置是否合理, Hfile是否过多
    • 表结构的设计问题

在设置scan检索的时候可以设置scan的cache;scan在检索的时候不会一次将数据加载到本地;而是多次rpc请求加载(防止影响其他业务或者数据量大造成内存溢出);cache默认100条大小设置的大一点可以减少rpc的请求数据次数。

查询数据的时候blockcache如果不能命中;还要去Hfile里拉取数据。
Hfile不能过多;过多会导致查询缓慢;需要compact合并。

生产中做过调整的参数

  • hbase.client.operation.timeout
    该参数表示HBase客户端发起一次数据操作直至得到响应之间总的超时时间
  • phoenix.query.timeoutMs
    Phoenix客户端查询超时
  • phoenix.query.keepAliveMs
    当客户端的线程超过线程池的核心线程数量时,空闲线程等等待任务的默认时间。超过这个时间,空闲线程就会关闭,默认是60s
  • hbase.client.ipc.pool.size
    连接池的大小可以通过hbase.client.ipc.pool.size配置项指定,默认为1。
  • dfs.socket.timeout
    旧版本客户端的超时时间 HBase的SocketTimeoutException错误 调大
  • dfs.client.socket-timeout
    新版本客户端的超时时间
  • hbase.client.operation.timeout
    该参数表示HBase客户端发起一次数据操作直至得到响应之间总的超时时间
  • hbase.master.namespace.init.timeout
    如果一个集群region非常多,默认5分钟可能还分配不到namespace表,还得修改hbase.master.namespace.init.timeout超时时间
  • hbase.regionserver.executor.openregion.threads
    开打用户表region的线程数量 默认3
  • index.builder.threads.max 默认10
    根据主表的更新来确定更新索引表的线程数

HBase协处理器简介

  • HBase coprocessor
    • Hbase协处理器受bigtable协处理器的启发, 为用户提供类库和运行时环境, 使得代码能够在hbase regionserver和master上处理
    • 系统协处理器and表协处理器(coprocessor分为两类协处理器)
    • Observer and Endpoint(HBase 提供的coprocessor插件)
    • 系统协处理器: 全局加载到regionserver托管的所有表和region上(针对整个集群)
    • 表协处理器: 用户可以指定一张表使用协处理器(针对单张表)
    • 观察者 (Observer): 类似于关系数据库的触发器
    • 终端 (Endpoint): 动态的终端有点像存储过程
  • Observer
    • Regionobserver: 提供客户端的数据操纵事件钩子: Get, Put,Delete, Scan等
    • Masterobserver: 提供DDL类型的操作钩子。如创建、删除修改数据表等
    • Walobserver: 提供wal相关操作钩子
  • Observer应用场景
    • 安全性: 例如执行get或put操作前, 通过preget或preput方法检查, 是否允许该操作
    • 引用完整性约束: hbase并不支持关系型数据库中的引用完整性约束概念, 即通常所说的外键;我们可以使用协处理器增强这种约束
  • Endpoint
    • endpoint是动态rpc插件的接口, 它的实现代码被安装在服务器端, 从而能够通过hbase Rpc唤醒
    • 调用接口, 它们的实现代码会被目标regionserver远程执行·
    • 典型的案例: 一个大table有几百个region, 需要计算某列的平均值或者总和

HBase实战:开发RegionObserver协处理器

◆实现一个endpoint类型的协处理器

  • 待整理

◆实现一个regionobserver类型的协处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package com.kun.hbase;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;



/**
* Created by jixin on 18-2-25.
*/
public class RegionObserverTest extends BaseRegionObserver {

private byte[] columnFamily = Bytes.toBytes("cf");
private byte[] countCol = Bytes.toBytes("countCol");
private byte[] unDeleteCol = Bytes.toBytes("unDeleteCol");
private RegionCoprocessorEnvironment environment;

//regionserver 打开region前执行
@Override
public void start(CoprocessorEnvironment e) throws IOException {
environment = (RegionCoprocessorEnvironment) e;
}

//RegionServer关闭region前调用
@Override
public void stop(CoprocessorEnvironment e) throws IOException {

}

/**
* 需求一
* 1. cf:countCol 进行累加操作。 每次插入的时候都要与之前的值进行相加
* 需要重载prePut方法
*/

@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
Durability durability) throws IOException {
if (put.has(columnFamily, countCol)) {//获取old countcol value
Result rs = e.getEnvironment().getRegion().get(new Get(put.getRow()));
int oldNum = 0;
for (Cell cell : rs.rawCells()) {
if (CellUtil.matchingColumn(cell, columnFamily, countCol)) {
oldNum = Integer.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));
}
}

//获取new countcol value
List<Cell> cells = put.get(columnFamily, countCol);
int newNum = 0;
for (Cell cell : cells) {
if (CellUtil.matchingColumn(cell, columnFamily, countCol)) {
newNum = Integer.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));
}
}

//sum AND update Put实例
put.addColumn(columnFamily, countCol, Bytes.toBytes(String.valueOf(oldNum + newNum)));
}
}

/**
* 需求二
* 2. 不能直接删除unDeleteCol 删除countCol的时候将unDeleteCol一同删除
* 需要重载preDelete方法
*/
@Override
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
WALEdit edit,
Durability durability) throws IOException {
//判断是否操作删除了cf列族
List<Cell> cells = delete.getFamilyCellMap().get(columnFamily);
if (cells == null || cells.size() == 0) {
return;
}

boolean deleteFlag = false;
for (Cell cell : cells) {
byte[] qualifier = CellUtil.cloneQualifier(cell);

if (Arrays.equals(qualifier, unDeleteCol)) {
throw new IOException("can not delete unDel column");
}

if (Arrays.equals(qualifier, countCol)) {
deleteFlag = true;
}
}

if (deleteFlag) {
delete.addColumn(columnFamily, unDeleteCol);
}
}

}

HBase实战:HBase协处理器加载

  • 待整理
    • 配置文件加载: 即通过hbase-site. Xml文件配置加载, 一般这样的协处理器是系统级别的.


  • Shell加载: 可以通过alter命令来对表进行schema修改来加载协处理器
  • shell加载;jar包|全类名|优先级(会自动分配)|协处理器相关参数

首先对regionobserver进行编译 mvn clean install
上传到hdfs集群中
测试协处理器







  • 通过api代码加载: 即通过api的方式来加载协处理器
    配置文件加载

Hbase卸载/更新协处理器

  • 重复加载的第二个coprocessor实例不会发挥作用
  • 要完成卸载/更新就需要重启JVM, 也就是重启regionserver